S3バケットにあるJSONで1行に繋がった形式のレコードをLambdaのテストイベントでJSON Lines形式に変換して出力する(Python3.9)
こんにちは、洲崎です。
S3バケットにあるJSONで1行に繋がった形式のレコードをLambda(Python3.9)のテストイベントで、JSON Lines形式に変換して出力してみたので共有します。
S3バケットにある全オブジェクトを一斉にパースさせたい
前回の記事の続きになりますが、Kinesis Data Firehoseをデフォルトの設定のまま利用すると、複数のJSONが1行にまとまってAthenaやQuickSightで処理できない問題がありました。
Kinesis Data Firehoseの動的パーティショニングの設定を使うことで、Kinesis Data Firehoseからパースできるのですが、すでに既存データがS3に溜まってしまっている場合、別途パース処理が必要です。
今回はLambda(Python3.9)を利用して、S3バケットにある全オブジェクトをパースしてみました。
Lambda
サンプルコード
S3バケットの指定の部分を対象のバケット名に変えるだけで動作するようになっています。
別のS3バケットに出力する場合はsourceとdestをそれぞれ指定します。
※気になる点等あればフィードバックいただけると幸いです。
既存のS3オブジェクトを上書きで保存する場合
import boto3 s3 = boto3.resource('s3') def lambda_handler(event, context): #S3バケット名を指定 bucket_name = 'sample-bucket' # S3バケット内の全オブジェクトのキーをリストアップ bucket = s3.Bucket(bucket_name) # 各オブジェクトを読み込み、改行を追加してJSONL形式に変換して上書きする for obj in bucket.objects.all(): # 元のオブジェクト内容を取得 response = obj.get() contents = response['Body'].read().decode('utf-8') # JSON文字列を改行区切りのJSONL形式に変換 new_data = contents.replace('}{', '}\n{') # 最後の改行コードを削除 if new_data.endswith('\n'): new_data = new_data[:-1] # 変換したJSONL形式を上書きして保存 obj.put(Body=new_data)
別のS3バケットに出力する場合
import boto3 s3 = boto3.resource('s3') def lambda_handler(event, context): #S3バケット名を指定 source_bucket_name = 'source_bucket' dest_bucket_name = 'dest_bucket' # S3バケット内の全オブジェクトのキーをリストアップ source_bucket = s3.Bucket(source_bucket_name) # 各オブジェクトを読み込み、改行を追加してJSONL形式に変換し、別のバケットへ保存する for object in source_bucket.objects.all(): # 元のオブジェクト内容を取得 obj = s3.Object(source_bucket_name, object.key) response = obj.get() contents = response['Body'].read().decode('utf-8') # JSON文字列を改行区切りのJSONL形式に変換 new_data = contents.replace('}{', '}\n{') # 最後の改行コードを削除 if new_data.endswith('\n'): new_data = new_data[:-1] # 変換したJSONL形式を別のバケットに保存 dest_obj = s3.Object(dest_bucket_name, object.key) dest_obj.put(Body=new_data)
注意点
現時点でパッと思いつく注意点を上げます。
- Lambdaのテストイベントで実行することを想定している為、Firehoseに設定したり、S3のプットイベント等の場合はコードの修正が必要になります。
- S3のGetやPut料金が別途発生します。
- オブジェクトの量によってタイムアウトやメモリ量等、Lambdaの性能面を考慮する必要があります。
- あまりにオブジェクトの量が多い場合は分割してLambda処理を行うことや、Lambda以外の方法を考える必要があります。
最後に
今回はS3バケットにある全オブジェクトを対象として、JSONで1行に繋がった形式のレコードをLambda(Python3.9)のテストイベントで、JSON Lines形式に変換して出力してみました。
もし同様の事象で困っている方がいましたら参考にしてみてください。
ではまた!コンサルティング部の洲崎でした。